package com.myhadoop;

/**
 * 项目名称：MapReduce01
 * 类 名 称：SplitByLine
 * 类 描 述：TODO
 * 创建时间：2021/12/6 下午6:53
 * 创 建 人：allen
 * e-mail ：allengao@pku.edu.cn
 */

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.util.LineReader;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * Created with IntelliJ IDEA.
 * User: Isaac Li
 * Date: 12/20/12
 * Time: 1:25 PM
 * To change this template use File | Settings | File Templates.
 */
public class SplitByLine extends FileInputFormat<LongWritable, Text> {
    private int N = 3;

    @Override
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new LineRecordReader();
    }

    public List<InputSplit> getSplits(JobContext job) throws IOException {
        List<InputSplit> splits = new ArrayList<InputSplit>();
        for (FileStatus file: listStatus(job)){
            Path path = file.getPath();
            FileSystem fs = path.getFileSystem(job.getConfiguration());
            LineReader lr = null;
            try{
                FSDataInputStream in = fs.open(path);
                Configuration conf = job.getConfiguration();
                lr = new LineReader(in, conf);
                N = conf.getInt("mapred.line.input.format.linespermap", 3);
                Text line = new Text();
                int numLines = 0;
                long begin = 0;
                long length = 0;
                int num = -1;
                while((num = lr.readLine(line)) > 0){
                    numLines++;
                    length += num;
                    if (numLines == N){
                        splits.add(new FileSplit(path, begin, length, new String[]{}));
                        begin += length;
                        length = 0;
                        numLines = 0;
                    }
                }
                if (numLines != 0) {
                    splits.add(new FileSplit(path, begin, length, new String[]{}));
                }
            }finally {
                if (lr != null){
                    lr.close();
                }
            }
        }
        System.out.println("Total # of splits: " + splits.size());
        return splits;
    }
}
